package com.lyon.dmeo.storage.client.raft;

import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.SerializeUtil;
import com.lyon.demo.protocol.api.core.command.ResponseFuture;
import com.lyon.demo.storage.client.api.core.core.Entry;
import com.lyon.demo.storage.client.api.core.core.StorageClient;
import com.lyon.demo.storage.client.api.core.core.Tag;
import com.lyon.demo.storage.client.api.core.command.request.HeartbeatRequest;
import com.lyon.demo.storage.client.api.core.command.request.PushEntryRequest;
import com.lyon.demo.storage.client.api.core.command.request.VoteRequest;
import com.lyon.demo.storage.client.api.core.command.response.HeartBeatResponse;
import com.lyon.demo.storage.client.api.core.command.response.PushEntryResponse;
import com.lyon.demo.storage.client.api.core.command.response.VoteResponse;
import com.lyon.demo.storage.client.api.core.config.DLedgerConfig;
import com.lyon.demo.storage.client.api.core.core.AbstractStoreService;
import com.lyon.demo.storage.client.api.core.core.MemberState;
import com.lyon.demo.storage.client.api.core.exception.DLedgerException;
import com.lyon.demo.storage.client.api.core.protocol.core.RoleChangeHandler;
import com.lyon.demo.storage.client.api.core.protocol.core.RoleChangeSupport;
import com.lyon.demo.storage.common.spi.DefaultSpiLoader;
import com.lyon.demo.storage.common.thread.StartupShutdownAble;
import com.lyon.dmeo.storage.client.raft.core.proccessor.DLedgerRaftProtocolHandler;
import com.lyon.dmeo.storage.client.raft.core.AppendEntryResponse;
import com.lyon.dmeo.storage.client.raft.endpoint.RpcRemoteService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
 * @author LeeYan9
 * @since 2022-05-19
 */

@SuppressWarnings({"rawtypes", "unused", "AlibabaClassNamingShouldBeCamel", "unchecked"})
@Getter
@Slf4j
public class DLedgerServer implements DLedgerRaftProtocolHandler, RoleChangeSupport, StorageClient, StartupShutdownAble {

    private final DLedgerLeaderElection dLedgerLeaderElection;
    private final RpcRemoteService rpcRemoteService;
    private final DLedgerConfig dLedgerConfig;
    private final MemberState memberState;
    private final DLedgerEntryPusher dLedgerEntryPusher;
    private final AbstractStoreService store;

    public DLedgerServer(DLedgerConfig dLedgerConfig) {
        this.dLedgerConfig = dLedgerConfig;
        this.memberState = new MemberState(dLedgerConfig);
        this.rpcRemoteService = new RpcRemoteService(this);
        this.dLedgerLeaderElection = new DLedgerLeaderElection(this);
        this.store = DefaultSpiLoader.INSTANCE.loader(AbstractStoreService.class, dLedgerConfig.getStoreStrategy());
        this.store.setMemberState(memberState);
        this.dLedgerEntryPusher = new DLedgerEntryPusher(memberState, dLedgerConfig, rpcRemoteService, store);
    }

    @Override
    public void startup() {
        rpcRemoteService.startup();
        dLedgerLeaderElection.startup();
        dLedgerEntryPusher.startup();
    }


    @Override
    public void shutdown() {
        rpcRemoteService.shutdown();
        dLedgerLeaderElection.shutdown();
    }

    @Override
    public <T, R> CompletableFuture<R> commit(T data) {
        try {
            return appendAsLeader(() -> buildEntry(data));
        } catch (Exception e) {
            log.error("", e);
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override
    public <R> CompletableFuture<R> get(long index) {
        CompletableFuture<R> future = new CompletableFuture();
        try {
            Entry entry = store.hitEntry(index);
            future.complete((R) entry);
        } catch (Exception e) {
            log.error("", e);
            future.completeExceptionally(e);
        }
        return future;
    }

    @Override
    public <R, K> CompletableFuture<R> get(String tag, K key) {
        CompletableFuture<R> future = new CompletableFuture();
        try {
            Entry entry = store.hitEntry(CharSequenceUtil.isBlank(tag) ? Tag.DEFAULT : tag, key);
            future.complete(Objects.isNull(entry) ? null : (R) entry);
        } catch (Exception e) {
            log.error("", e);
            future.completeExceptionally(e);
        }
        return future;
    }

    @Override
    public <R, K> CompletableFuture<R> get(K key) {
        return get(null, key);
    }

    @Override
    public <R, K, V> CompletableFuture<R> commitEntry(K key, V value) {
        return commitEntry(null, key, value);
    }

    @Override
    public <R, K, V> CompletableFuture<R> commitEntry(String tag, K key, V value) {
        try {
            return appendAsLeader(() -> buildEntry(tag, key, value));
        } catch (Exception e) {
            log.error("", e);
            return CompletableFuture.failedFuture(e);
        }
    }

    private <R> CompletableFuture<R> appendAsLeader(Supplier<Entry> supplier) {
        Assert.notNull(supplier);
        Entry entry = supplier.get();
        store.appendAsLeader(entry);
        if (entry.getIndex() == -1) {
            return CompletableFuture.failedFuture(new RuntimeException("appendAsLeader ERROR"));
        }
        ResponseFuture<AppendEntryResponse> appendFuture = dLedgerEntryPusher.waitAck(entry);
        AtomicReference<Throwable> e = null;
        appendFuture.whenComplete(this::resolverAppendFuture);
        return CompletableFuture.completedFuture((R) entry);
    }

    private void resolverAppendFuture(AppendEntryResponse response, Throwable throwable) {
        if (throwable != null) {
            log.error("", throwable);
            throw new DLedgerException("追加数据异常");
        }
        switch (response.getCode()) {
            case SUCCESS:
                break;
            case TIMI_OUT:
                throw new DLedgerException("追加数据超时");
            case TERM_CHANGED:
                throw new DLedgerException("任期变更,追加数据失败");
            case UNKNOWN_ERROR:
                throw new DLedgerException("追加数据异常");
            default:
                throw new DLedgerException("追加数据失败,未知结果类型");
        }
    }

    private <T> Entry buildEntry(T data) {
        Entry entry = new Entry();
        entry.setBody(SerializeUtil.serialize(data));
        return entry;
    }

    private <K, V> Entry buildEntry(String tag, K key, V value) {
        Entry entry = new Entry();
        Pair<K, V> data = new Pair<>(key, value);
        entry.setTag(CharSequenceUtil.isBlank(tag) ? Tag.DEFAULT : tag);
        entry.setBody(SerializeUtil.serialize(data));
        return entry;
    }

    @Override
    public void addRoleChangeHandler(RoleChangeHandler handler) {
        this.dLedgerLeaderElection.addRoleChangeHandler(handler);
    }

    @Override
    public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartbeatRequest heartbeatRequest) {
        return dLedgerLeaderElection.handleHeartbeat(heartbeatRequest);
    }

    @Override
    public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest entryRequest) {
        return dLedgerEntryPusher.handlePush(entryRequest);
    }

    @Override
    public CompletableFuture<VoteResponse> handleVote(VoteRequest voteRequest) {
        return dLedgerLeaderElection.handleVote(voteRequest, false);
    }
}
